package com.chukun.flink.stream.connector.source;

import com.chukun.flink.stream.bean.KafkaMess;
import com.google.gson.Gson;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * @author chukun
 * @version 1.0.0
 * @description kafka消息反序列化工具类
 * @createTime 2022年05月28日 18:32:00
 */
public class KafkaDeserializationSchema extends AbstractDeserializationSchema<KafkaMess> {
    @Override
    public KafkaMess deserialize(byte[] bytes) throws IOException {
        if (ArrayUtils.isEmpty(bytes)) {
            return null;
        }
        String json = new String(bytes, StandardCharsets.UTF_8);
        Gson gson = new Gson();
        return gson.fromJson(json, KafkaMess.class);
    }
}
